tutorials/016 - EMR & Docker.ipynb (349 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[](https://github.com/aws/aws-sdk-pandas)\n",
"\n",
"# 16 - EMR & Docker"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import getpass\n",
"\n",
"import boto3\n",
"\n",
"import awswrangler as wr"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enter your bucket name:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdin",
"output_type": "stream",
"text": [
" ··········································\n"
]
}
],
"source": [
"bucket = getpass.getpass()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Enter your Subnet ID:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdin",
"output_type": "stream",
"text": [
" ························\n"
]
}
],
"source": [
"subnet = getpass.getpass()"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"## Build and Upload Docker Image to ECR repository\n",
"\n",
"Replace the `{ACCOUNT_ID}` placeholder."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
},
"pycharm": {
"name": "#%%writefile Dockerfile\n"
}
},
"outputs": [],
"source": [
"%%writefile Dockerfile\n",
"\n",
"FROM amazoncorretto:8\n",
"\n",
"RUN yum -y update\n",
"RUN yum -y install yum-utils\n",
"RUN yum -y groupinstall development\n",
"\n",
"RUN yum list python3*\n",
"RUN yum -y install python3 python3-dev python3-pip python3-virtualenv\n",
"\n",
"RUN python -V\n",
"RUN python3 -V\n",
"\n",
"ENV PYSPARK_DRIVER_PYTHON python3\n",
"ENV PYSPARK_PYTHON python3\n",
"\n",
"RUN pip3 install --upgrade pip\n",
"RUN pip3 install awswrangler\n",
"\n",
"RUN python3 -c \"import awswrangler as wr\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"docker build -t 'local/emr-wrangler' .\n",
"aws ecr create-repository --repository-name emr-wrangler\n",
"docker tag local/emr-wrangler {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\n",
"eval $(aws ecr get-login --region us-east-1 --no-include-email)\n",
"docker push {ACCOUNT_ID}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating EMR Cluster"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"cluster_id = wr.emr.create_cluster(subnet, docker=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Refresh ECR credentials in the cluster (expiration time: 12h )"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'s-1B0O45RWJL8CL'"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Uploading application script to Amazon S3 (PySpark)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"script = \"\"\"\n",
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"docker-awswrangler\").getOrCreate()\n",
"sc = spark.sparkContext\n",
"\n",
"print(\"Spark Initialized\")\n",
"\n",
"import awswrangler as wr\n",
"\n",
"print(f\"awswrangler version: {wr.__version__}\")\n",
"\"\"\"\n",
"\n",
"boto3.client(\"s3\").put_object(Body=script, Bucket=bucket, Key=\"test_docker.py\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit PySpark step"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n",
"\n",
"step_id = wr.emr.submit_spark_step(cluster_id, f\"s3://{bucket}/test_docker.py\", docker_image=DOCKER_IMAGE)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Wait Step"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n",
" pass"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Terminate Cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"wr.emr.terminate_cluster(cluster_id)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Another example with custom configurations"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"cluster_id = wr.emr.create_cluster(\n",
" cluster_name=\"my-demo-cluster-v2\",\n",
" logging_s3_path=f\"s3://{bucket}/emr-logs/\",\n",
" emr_release=\"emr-6.7.0\",\n",
" subnet_id=subnet,\n",
" emr_ec2_role=\"EMR_EC2_DefaultRole\",\n",
" emr_role=\"EMR_DefaultRole\",\n",
" instance_type_master=\"m5.2xlarge\",\n",
" instance_type_core=\"m5.2xlarge\",\n",
" instance_ebs_size_master=50,\n",
" instance_ebs_size_core=50,\n",
" instance_num_on_demand_master=0,\n",
" instance_num_on_demand_core=0,\n",
" instance_num_spot_master=1,\n",
" instance_num_spot_core=2,\n",
" spot_bid_percentage_of_on_demand_master=100,\n",
" spot_bid_percentage_of_on_demand_core=100,\n",
" spot_provisioning_timeout_master=5,\n",
" spot_provisioning_timeout_core=5,\n",
" spot_timeout_to_on_demand_master=False,\n",
" spot_timeout_to_on_demand_core=False,\n",
" python3=True,\n",
" docker=True,\n",
" spark_glue_catalog=True,\n",
" hive_glue_catalog=True,\n",
" presto_glue_catalog=True,\n",
" debugging=True,\n",
" applications=[\"Hadoop\", \"Spark\", \"Hive\", \"Zeppelin\", \"Livy\"],\n",
" visible_to_all_users=True,\n",
" maximize_resource_allocation=True,\n",
" keep_cluster_alive_when_no_steps=True,\n",
" termination_protected=False,\n",
" spark_pyarrow=True,\n",
")\n",
"\n",
"wr.emr.submit_ecr_credentials_refresh(cluster_id, path=f\"s3://{bucket}/emr/\")\n",
"\n",
"DOCKER_IMAGE = f\"{wr.get_account_id()}.dkr.ecr.us-east-1.amazonaws.com/emr-wrangler:emr-wrangler\"\n",
"\n",
"step_id = wr.emr.submit_spark_step(cluster_id, f\"s3://{bucket}/test_docker.py\", docker_image=DOCKER_IMAGE)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"while wr.emr.get_step_state(cluster_id, step_id) != \"COMPLETED\":\n",
" pass\n",
"\n",
"wr.emr.terminate_cluster(cluster_id)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.9.14",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.14"
}
},
"nbformat": 4,
"nbformat_minor": 4
}